package com.shujia.flink.core

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object Demo4ProcessingTime {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val wordsDS: DataStream[String] = linesDS.flatMap(_.split(","))

    val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))


    /**
     * 统计单词的数量，每隔5秒统计一次，计算最近5秒单词的数量
     *
     * TumblingProcessingTimeWindows: 滚动的处理时间窗口
     *
     */
    kvDS
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .sum(1)
      .print()

    env.execute()
  }

}
